iT邦幫忙

2023 iThome 鐵人賽

DAY 21
0
DevOps

大家都在用 Terraform 實作 IaC 為什麼不將程式寫得更簡潔易讀呢?系列 第 21

實作 AWS 常用服務之 Terraform 模組系列 - MSK 篇

  • 分享至 

  • xImage
  •  

AWS MSK 模組實作

本篇是實作常用的 AWS MSK 服務之 Terraform 模組,完整的專案程式碼分享在我的 Github 上。

  1. 先定義模組 my_msk 的放置位置 modules/my_msk:
├── configs
│   ├── iam
│   │   ├── assume_role_policies
│   │   ├── policies
│   │   ├── role_policies
│   │   ├── user_policies
│   │   └── iam.yaml
│   ├── s3
│   │   ├── policies
│   │   └── s3.yaml
│   ├── subnet
│   │   └── my-subnets.yaml
│   └── vpc
│       └── my-vpcs.yaml
├── example.tfvars
├── locals.tf
├── main.tf
├── modules
│   ├── my_cloudfront
│   ├── my_cloudwatch
│   ├── my_eips
│   ├── my_eventbridge
│   ├── my_iam
│   ├── my_igw
│   ├── my_instances
│   ├── my_kinesis_stream
│   ├── my_kms
│   ├── my_msk
│   │   ├── cluster.tf
│   │   ├── configuration.tf
│   │   ├── outputs.tf
│   │   ├── provider.tf
│   │   ├── scram-secret-association.tf
│   │   ├── storage_autoscale.tf
│   │   └── variables.tf
│   ├── my_nacls
│   ├── my_route_tables
│   ├── my_s3
│   ├── my_subnets
│   └── my_vpc
└── variables.tf
  1. 撰寫 my_msk 模組:
  • ./modules/my_msk/outputs.tf:
output "kafka_bootstrap_brokers_sasl_scram" {
  value = aws_msk_cluster.my_kafka.bootstrap_brokers_sasl_scram
}

  • ./modules/my_msk/provider.tf:
provider "aws" {
    region  = var.aws_region
    profile = var.aws_profile
}
  • ./modules/my_msk/variables.tf:
variable "aws_region" {
  description = "AWS region"
  default     = "ap-northeast-1"
}

variable "aws_profile" {
  description = "AWS profile"
  default     = ""
}

variable "project_name" {
  type    = string
  description = "Project name"
  default = ""
}

variable "department_name" {
  type        = string
  description = "Department name"
  default     = "SRE"
}

variable "msk_users" {
  type = list(any)
}

variable "server_properties" {
  type = string
}

variable "kafka_cluster_name" {
  type = string
}

variable "kafka_version" {
  type = string
}

variable "kafka_number_of_broker_nodes" {
  type = string
}

variable "kafka_instance_type" {
  type = string
}

variable "kafka_ebs_volume_size" {
  type = number
}

variable "kafka_client_subnets" {
  type = list(any)
}

variable "kafka_security_groups" {
  type = list(any)
}

variable "kafka_scaling_max_capacity" {
  type = number
}

variable "kafka_log_group_name" {
  type    = string
  default = "/nxd/kafka"
}

  • ./modules/my_msk/cluster.tf:
resource "aws_msk_cluster" "kafka" {
  cluster_name           = var.kafka_cluster_name
  kafka_version          = var.kafka_version
  number_of_broker_nodes = var.kafka_number_of_broker_nodes

  broker_node_group_info {
    instance_type   = var.kafka_instance_type
    client_subnets  = var.kafka_client_subnets
    security_groups = var.kafka_security_groups

    storage_info {
      ebs_storage_info {
        provisioned_throughput {
          enabled = false
        }

        volume_size = var.kafka_ebs_volume_size
      }
    }
  }

  client_authentication {
    sasl {
      scram = true
    }
  }

  configuration_info {
    arn      = aws_msk_configuration.kafka_config_general.arn
    revision = aws_msk_configuration.kafka_config_general.latest_revision
  }

  open_monitoring {
    prometheus {
      jmx_exporter {
        enabled_in_broker = true
      }
      node_exporter {
        enabled_in_broker = true
      }
    }
  }

  logging_info {
    broker_logs {
      cloudwatch_logs {
        enabled   = true
        log_group = var.kafka_log_group_name
      }
    }
  }

  tags = {
    type = "kafkaMSK"
  }

  depends_on = [
    aws_msk_configuration.kafka_config_general
  ]

}

  • ./modules/my_msk/configuration.tf:
resource "aws_msk_configuration" "kafka_config_general" {
  kafka_versions    = ["${var.kafka_version}"]
  name              = "kafka-config-general"
  server_properties = var.server_properties
}

  • ./modules/my_msk/scram-secret-association.tf:


resource "aws_secretsmanager_secret" "kafka_secrets" {
  for_each   = { for user in var.msk_users : user.username => user }
  name       = "AmazonMSK_${each.value.username}"
  kms_key_id = aws_kms_key.kafka_key.key_id
}

resource "aws_kms_key" "kafka_key" {
  description = "Example Key for MSK Cluster Scram Secret Association"
}

resource "aws_secretsmanager_secret_version" "kafka_secrets" {
  for_each      = { for user in var.msk_users : user.username => user }
  secret_id     = aws_secretsmanager_secret.kafka_secrets["${each.value.username}"].id
  secret_string = jsonencode({ username : "${each.value.username}", password : "${each.value.password}" })

  depends_on = [
    aws_secretsmanager_secret.kafka_secrets
  ]
}

resource "aws_secretsmanager_secret_policy" "kafka_secrets" {
  for_each   = { for user in var.msk_users : user.username => user }
  secret_arn = aws_secretsmanager_secret.kafka_secrets["${each.value.username}"].arn
  policy     = <<POLICY
{
  "Version" : "2012-10-17",
  "Statement" : [ {
    "Sid": "AWSKafkaResourcePolicy",
    "Effect" : "Allow",
    "Principal" : {
      "Service" : "kafka.amazonaws.com"
    },
    "Action" : "secretsmanager:getSecretValue",
    "Resource" : "${aws_secretsmanager_secret.kafka_secrets["${each.value.username}"].arn}"
  } ]
}
POLICY

  depends_on = [
    aws_secretsmanager_secret.kafka_secrets
  ]
}
resource "aws_msk_scram_secret_association" "kafka" {
  cluster_arn = aws_msk_cluster.kafka.arn
  secret_arn_list = flatten([
    for user in var.msk_users :
    "${aws_secretsmanager_secret.kafka_secrets["${user.username}"].arn}"
  ])

  depends_on = [aws_secretsmanager_secret_version.kafka_secrets]
}

  • ./modules/my_msk/storage_autoscale.tf:
resource "aws_appautoscaling_target" "kafka_storage" {
  max_capacity       = var.kafka_scaling_max_capacity
  min_capacity       = 1
  resource_id        = aws_msk_cluster.kafka.arn
  scalable_dimension = "kafka:broker-storage:VolumeSize"
  service_namespace  = "kafka"
}

resource "aws_appautoscaling_policy" "kafka_scaling_policy" {
  name               = "nxd-kafka-broker-scaling"
  policy_type        = "TargetTrackingScaling"
  resource_id        = aws_msk_cluster.kafka.arn
  scalable_dimension = aws_appautoscaling_target.kafka_storage.scalable_dimension
  service_namespace  = aws_appautoscaling_target.kafka_storage.service_namespace

  target_tracking_scaling_policy_configuration {
    predefined_metric_specification {
      predefined_metric_type = "KafkaBrokerStorageUtilization"
    }

    target_value = 80
  }
}

  1. 撰寫專案相關程式
  • example.tfvars:
aws_region="ap-northeast-1"
aws_profile="<YOUR_PROFILE>"
project_name="example"
department_name="SRE"
  • main.tf:
terraform {
  required_providers {
    aws = {
      version = "5.15.0"
    }
  }

  backend "s3" {
    bucket                  = "<YOUR_S3_BUCKET_NAME>"
    dynamodb_table          = "<YOUR_DYNAMODB_TABLE_NAME>"
    key                     = "terraform.tfstate"
    region                  = "ap-northeast-1"
    shared_credentials_file = "~/.aws/config"
    profile                 = "<YOUR_PROFILE>"
  }
}

其他模組省略...

# msk
module "kafka" {
  aws_profile = var.aws_profile
  aws_region  = var.aws_region

  kafka_cluster_name           = "my-kafka"
  kafka_version                = "2.8.1"
  kafka_number_of_broker_nodes = 2
  kafka_instance_type          = "kafka.t3.small"
  kafka_ebs_volume_size        = 600
  kafka_scaling_max_capacity   = 500
  kafka_client_subnets = [
    module.subnet.subnets["my-application-ap-northeast-1a"].id,
    module.subnet.subnets["my-application-ap-northeast-1c"].id,
  ]
  kafka_security_groups = ["sg-0d7ae01cd3dc3a16d"]

  server_properties = <<PROPERTIES
auto.create.topics.enable=false
default.replication.factor=2
min.insync.replicas=1
num.io.threads=8
num.network.threads=5
num.partitions=10
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000
PROPERTIES

  msk_users = [
    {
      username = "msk-user"
      password = "kDzq86Y03QZ0"
    }
  ]

  source = "./modules/nxd_msk"
}


Terraform 執行計畫

  1. 於專案目錄下執行 terraform init && terraform plan --out .plan -var-file=example.tfvars 來確認一下結果:

Terraform used the selected providers to generate the following execution plan. Resource actions are indicated with the following symbols:
  + create

Terraform will perform the following actions:

  # module.msk.aws_appautoscaling_policy.kafka_scaling_policy will be created
  + resource "aws_appautoscaling_policy" "kafka_scaling_policy" {
      + alarm_arns         = (known after apply)
      + arn                = (known after apply)
      + id                 = (known after apply)
      + name               = "nxd-kafka-broker-scaling"
      + policy_type        = "TargetTrackingScaling"
      + resource_id        = (known after apply)
      + scalable_dimension = "kafka:broker-storage:VolumeSize"
      + service_namespace  = "kafka"

      + target_tracking_scaling_policy_configuration {
          + disable_scale_in = false
          + target_value     = 80

          + predefined_metric_specification {
              + predefined_metric_type = "KafkaBrokerStorageUtilization"
            }
        }
    }

  # module.msk.aws_appautoscaling_target.kafka_storage will be created
  + resource "aws_appautoscaling_target" "kafka_storage" {
      + arn                = (known after apply)
      + id                 = (known after apply)
      + max_capacity       = 500
      + min_capacity       = 1
      + resource_id        = (known after apply)
      + role_arn           = (known after apply)
      + scalable_dimension = "kafka:broker-storage:VolumeSize"
      + service_namespace  = "kafka"
      + tags_all           = (known after apply)
    }

  # module.msk.aws_kms_key.kafka_key will be created
  + resource "aws_kms_key" "kafka_key" {
      + arn                                = (known after apply)
      + bypass_policy_lockout_safety_check = false
      + customer_master_key_spec           = "SYMMETRIC_DEFAULT"
      + description                        = "Example Key for MSK Cluster Scram Secret Association"
      + enable_key_rotation                = false
      + id                                 = (known after apply)
      + is_enabled                         = true
      + key_id                             = (known after apply)
      + key_usage                          = "ENCRYPT_DECRYPT"
      + multi_region                       = (known after apply)
      + policy                             = (known after apply)
      + tags_all                           = (known after apply)
    }

  # module.msk.aws_msk_cluster.kafka will be created
  + resource "aws_msk_cluster" "kafka" {
      + arn                                           = (known after apply)
      + bootstrap_brokers                             = (known after apply)
      + bootstrap_brokers_public_sasl_iam             = (known after apply)
      + bootstrap_brokers_public_sasl_scram           = (known after apply)
      + bootstrap_brokers_public_tls                  = (known after apply)
      + bootstrap_brokers_sasl_iam                    = (known after apply)
      + bootstrap_brokers_sasl_scram                  = (known after apply)
      + bootstrap_brokers_tls                         = (known after apply)
      + bootstrap_brokers_vpc_connectivity_sasl_iam   = (known after apply)
      + bootstrap_brokers_vpc_connectivity_sasl_scram = (known after apply)
      + bootstrap_brokers_vpc_connectivity_tls        = (known after apply)
      + cluster_name                                  = "my-kafka"
      + current_version                               = (known after apply)
      + enhanced_monitoring                           = "DEFAULT"
      + id                                            = (known after apply)
      + kafka_version                                 = "2.8.1"
      + number_of_broker_nodes                        = 2
      + storage_mode                                  = (known after apply)
      + tags                                          = {
          + "type" = "kafkaMSK"
        }
      + tags_all                                      = {
          + "type" = "kafkaMSK"
        }
      + zookeeper_connect_string                      = (known after apply)
      + zookeeper_connect_string_tls                  = (known after apply)

      + broker_node_group_info {
          + az_distribution = "DEFAULT"
          + client_subnets  = [
              + "subnet-068ec8e9ec1f4ed8b",
              + "subnet-0a7ba0f71e5500d41",
            ]
          + instance_type   = "kafka.t3.small"
          + security_groups = [
              + "sg-0d7ae01cd3dc3a16d",
            ]

          + storage_info {
              + ebs_storage_info {
                  + volume_size = 600

                  + provisioned_throughput {
                      + enabled = false
                    }
                }
            }
        }

      + client_authentication {
          + sasl {
              + scram = true
            }
        }

      + configuration_info {
          + arn      = (known after apply)
          + revision = (known after apply)
        }

      + logging_info {
          + broker_logs {
              + cloudwatch_logs {
                  + enabled   = true
                  + log_group = "/nxd/kafka"
                }
            }
        }

      + open_monitoring {
          + prometheus {
              + jmx_exporter {
                  + enabled_in_broker = true
                }
              + node_exporter {
                  + enabled_in_broker = true
                }
            }
        }
    }

  # module.msk.aws_msk_configuration.kafka_config_general will be created
  + resource "aws_msk_configuration" "kafka_config_general" {
      + arn               = (known after apply)
      + id                = (known after apply)
      + kafka_versions    = [
          + "2.8.1",
        ]
      + latest_revision   = (known after apply)
      + name              = "kafka-config-general"
      + server_properties = <<-EOT
            auto.create.topics.enable=false
            default.replication.factor=2
            min.insync.replicas=1
            num.io.threads=8
            num.network.threads=5
            num.partitions=10
            num.replica.fetchers=2
            replica.lag.time.max.ms=30000
            socket.receive.buffer.bytes=102400
            socket.request.max.bytes=104857600
            socket.send.buffer.bytes=102400
            unclean.leader.election.enable=true
            zookeeper.session.timeout.ms=18000
        EOT
    }

  # module.msk.aws_msk_scram_secret_association.kafka will be created
  + resource "aws_msk_scram_secret_association" "kafka" {
      + cluster_arn     = (known after apply)
      + id              = (known after apply)
      + secret_arn_list = (known after apply)
    }

  # module.msk.aws_secretsmanager_secret.kafka_secrets["msk-user"] will be created
  + resource "aws_secretsmanager_secret" "kafka_secrets" {
      + arn                            = (known after apply)
      + force_overwrite_replica_secret = false
      + id                             = (known after apply)
      + kms_key_id                     = (known after apply)
      + name                           = "AmazonMSK_msk-user"
      + name_prefix                    = (known after apply)
      + policy                         = (known after apply)
      + recovery_window_in_days        = 30
      + tags_all                       = (known after apply)
    }

  # module.msk.aws_secretsmanager_secret_policy.kafka_secrets["msk-user"] will be created
  + resource "aws_secretsmanager_secret_policy" "kafka_secrets" {
      + id         = (known after apply)
      + policy     = (known after apply)
      + secret_arn = (known after apply)
    }

  # module.msk.aws_secretsmanager_secret_version.kafka_secrets["msk-user"] will be created
  + resource "aws_secretsmanager_secret_version" "kafka_secrets" {
      + arn            = (known after apply)
      + id             = (known after apply)
      + secret_id      = (known after apply)
      + secret_string  = (sensitive value)
      + version_id     = (known after apply)
      + version_stages = (known after apply)
    }

Plan: 9 to add, 0 to change, 0 to destroy.

───────────────────────────────────────────────────────────────────────────────────

Saved the plan to: .plan

To perform exactly these actions, run the following command to apply:
    terraform apply ".plan"
Releasing state lock. This may take a few moments...

下一篇文章將會展示實作 Using Packer to Create an AMI 以建置 Cassandra Cluster 為例子。


上一篇
實作 AWS 常用服務之 Terraform 模組系列 - KMS 篇
下一篇
實作 AWS 常用服務之 Terraform 模組系列 - Using Packer to Create an AMI 篇以建置 Cassandra Cluster 為例子
系列文
大家都在用 Terraform 實作 IaC 為什麼不將程式寫得更簡潔易讀呢?30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言